1 /*
2 Copyright: Marcelo S. N. Mancini (Hipreme|MrcSnm), 2018 - 2021
3 License:   [https://creativecommons.org/licenses/by/4.0/|CC BY-4.0 License].
4 Authors: Marcelo S. N. Mancini
5 
6 	Copyright Marcelo S. N. Mancini 2018 - 2021.
7 Distributed under the CC BY-4.0 License.
8    (See accompanying file LICENSE.txt or copy at
9 	https://creativecommons.org/licenses/by/4.0/
10 */
11 module hip.concurrency.thread;
12 import hip.concurrency.mutex;
13 import hip.config.opts;
14 
15 static if(HipConcurrency)
16 {
17     import core.thread;
18     import core.atomic;
19     import core.sync.semaphore:Semaphore;
20 
21     class HipWorkerThread : Thread
22     {
23         private struct WorkerJob
24         {
25             string name;
26             void delegate() task;
27             void delegate(string taskName) onTaskFinish;
28         }
29         private WorkerJob[] jobsQueue;
30         private Semaphore semaphore;
31         private bool isAlive = false;
32 
33         private int jobsCount;
34         private DebugMutex mutex;
35         private HipWorkerPool pool;
36         private ThreadID mainThreadID;
37 
38 
39         this(HipWorkerPool pool = null, ThreadID mainThreadID = ThreadID.init)
40         {
41             super(&run);
42             if(pool)
43                 this.pool = pool;
44             isAlive = true;
45             semaphore = new Semaphore;
46             this.mainThreadID = mainThreadID;
47             mutex = new DebugMutex(mainThreadID);
48         }
49         /**
50         *   This thread goes into an invalid state after finishing it. It should not be used anymore
51         */
52         void finish()
53         {
54             isAlive.atomicStore = false;
55             semaphore.notify;
56         }
57         bool isIdle()
58         {
59             return atomicLoad(jobsCount) == 0;
60         }
61         /**
62         *   Synchronized push on queue
63         */
64         void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null)
65         {
66             if(isAlive.atomicLoad)
67             {
68                 mutex.lock();
69                 jobsQueue~= WorkerJob(name, task, onTaskFinish);
70                 jobsCount++;
71                 mutex.unlock();
72                 semaphore.notify();
73             }
74             else
75             {
76                 import hip.console.log;
77                 logln("Thread is not alive to get tasks.");
78             }
79         }
80 
81         void startWorking()
82         {
83             if(!isRunning)
84                 start();
85         }
86         void await(bool rethrow = true)
87         {
88             // pushTask("await", () => finish);
89             // join(rethrow);
90         }
91 
92         void run()
93         {
94             while(isAlive)
95             {
96                 if(!isIdle)
97                 {
98                     mutex.lock();
99                     WorkerJob job = jobsQueue[0];
100                     jobsQueue = jobsQueue[1..$];
101                     mutex.unlock();
102                     try
103                     {
104                         job.task();
105                         if(job.onTaskFinish != null)
106                             job.onTaskFinish(job.name);
107                         atomicFetchSub(jobsCount, 1);
108                     }
109                     catch(Error e)
110                     {
111                         onAnyException(true, job.name, e.toString());
112                         return;
113                     }
114                     catch(Exception e)
115                     {
116                         onAnyException(false, job.name, e.toString());
117                         return;
118                     }
119                 }
120                 semaphore.wait;
121             }
122         }
123 
124         private void onAnyException(bool isError, string jobName, string message)
125         {
126             isAlive = false;
127             if(pool)
128                 pool.onHipThreadError(this, jobName, isError,message);
129         }
130         void dispose()
131         {
132             finish();
133             destroy(semaphore);
134             destroy(mutex);
135         }
136     }
137 
138 
139     class HipWorkerPool
140     {
141         HipWorkerThread[] threads;
142         protected Semaphore awaitSemaphore;
143         protected void delegate()[] finishHandlersOnMainThread;
144         protected void delegate()[] onAllTasksFinishHandlers;
145         protected DebugMutex handlersMutex;
146 
147         private struct Task
148         {
149             string name;
150             void delegate() task;
151             void delegate(string taskName) onTaskFinish = null;
152 
153             void execTask()
154             {
155                 task();
156                 if(onTaskFinish)
157                     onTaskFinish(name);
158             }
159         }
160         private Task[] mainThreadTasks;
161         private uint awaitCount = 0;
162         private shared size_t tasksCount;
163 
164 
165         this(size_t poolSize)
166         {
167             threads = new HipWorkerThread[](poolSize);
168             import hip.concurrency.internal:thisThreadID;
169             auto mainId = thisThreadID;
170             handlersMutex = new DebugMutex(mainId);
171             for(size_t i = 0; i < poolSize; i++)
172                 threads[i] = new HipWorkerThread(this, mainId);
173             awaitSemaphore = new Semaphore(0);
174         }
175 
176         void addOnAllTasksFinished(void delegate() onAllFinished)
177         {
178             if(tasksCount == 0)
179                 onAllFinished();
180             else
181                 onAllTasksFinishHandlers~= onAllFinished;
182         }
183 
184         protected void onHipThreadError(HipWorkerThread worker, string jobName, bool isError, string message)
185         {
186             if(awaitCount > 0)
187             {
188                 awaitSemaphore.notify();
189             }
190             import hip.util.array;
191             import hip.console.log;
192 
193 
194             logln("Worker ", jobName, " failed with ", isError ? "error" : "exception", ":", message);
195             threads.remove(worker);
196         }
197         void await()
198         {
199             awaitCount = 0;
200             foreach(thread; threads)
201             {
202                 if(!thread.isIdle)
203                 {
204                     thread.pushTask("Await", ()
205                     {
206                         awaitSemaphore.notify;
207                     });
208                     awaitCount++;
209                 }
210             }
211             startWorking();
212             while(awaitCount > 0)
213             {
214                 awaitSemaphore.wait();
215                 awaitCount--;
216             }
217         }
218         /**
219         *   Adds a task to the pool. If no idle worker is available, the task is executed on the main thread.
220         *   Keep in mind that pushin task is not enough. You need to call `startWorking()` to make it active after pushing tasks
221         * Params:
222         *   name = The name of the task.
223         *   task = The task to execute.
224         *   onTaskFinish = Callback to execute when the task completes.
225         *   isOnFinishOnMainThread = If true, the callback is executed on the main thread.
226         *
227         * Returns:
228         *   The worker thread handling the task or null if the task will be executed on main thread
229         */
230         HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = false)
231         {
232             atomicFetchAdd(tasksCount, 1);
233             foreach(i, thread; threads)
234             {
235                 if(thread.isIdle)
236                 {
237                     import hip.console.log;
238                     logln("Thread [", i, "] handling task ", name);
239                     if(onTaskFinish !is null && isOnFinishOnMainThread)
240                         thread.pushTask(name, task, notifyOnFinishOnMainThread(onTaskFinish));
241                     else
242                         thread.pushTask(name, task, notifyOnFinish(onTaskFinish));
243                     return thread;
244                 }
245             }
246             handlersMutex.lock();
247             scope(exit) handlersMutex.unlock();
248             //Execute a main thread task if it had anything.
249             mainThreadTasks~= Task(name, task, notifyOnFinish(onTaskFinish));
250             return null;
251         }
252 
253         static if(!HIP_ASSETMANAGER_PARTIAL_LOAD)
254         {
255             protected void executeMainThreadTasks()
256             {
257                 handlersMutex.lock();
258                 Task[] tasks;
259                 if(mainThreadTasks.length != 0)
260                 {
261                     tasks = mainThreadTasks.dup;
262                     mainThreadTasks.length = 0;
263                 }
264                 handlersMutex.unlock();
265                 foreach(t; tasks)
266                     t.execTask();
267             }
268         }
269         else
270         {
271             protected void executeMainThreadTasks()
272             {
273                 handlersMutex.lock();
274                 scope(exit) handlersMutex.unlock();
275                 import hip.util.time;
276                 long timeNow = HipTime.getCurrentTimeAsMsLong();
277 
278                 size_t executed;
279                 foreach(t; mainThreadTasks)
280                 {
281                     t.execTask();
282                     executed++;
283                     if(HipTime.getCurrentTimeAsMsLong() - timeNow > HIP_ASSETMANAGER_MAX_PROCESS_MS)
284                         break;
285                 }
286                 mainThreadTasks = mainThreadTasks[executed..$];
287             }
288         }
289 
290 
291         /**
292         *   This function should be called every time you push a task.
293         */
294         void startWorking()
295         {
296             foreach(thread; threads)
297                 if(!thread.isIdle)
298                     thread.startWorking();
299             executeMainThreadTasks();
300         }
301 
302         void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish = null)
303         {
304             return (name)
305             {
306                 if(onFinish)
307                     onFinish(name);
308                 atomicFetchSub(tasksCount, 1);
309             };
310         }
311 
312         void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true)
313         {
314             return (name)
315             {
316                 handlersMutex.lock();
317                     finishHandlersOnMainThread~= ()
318                     {
319                         onFinish(name);
320                         if(finished)
321                             atomicFetchSub(tasksCount, 1);
322                     };
323                 handlersMutex.unlock();
324             };
325         }
326         int getTasksCount()
327         {
328             return cast(int)atomicLoad(tasksCount);
329         }
330 
331         bool isIdle()
332         {
333             return atomicLoad(tasksCount) == 0;
334         }
335 
336         void pollFinished()
337         {
338             handlersMutex.lock();
339                 if(finishHandlersOnMainThread.length)
340                 {
341                     foreach(finishHandler; finishHandlersOnMainThread)
342                         finishHandler();
343                     finishHandlersOnMainThread.length = 0;
344                 }
345                 if(tasksCount == 0 && onAllTasksFinishHandlers.length)
346                 {
347                     foreach(onAllFinish; onAllTasksFinishHandlers)
348                         onAllFinish();
349                     onAllTasksFinishHandlers.length = 0;
350                 }
351             handlersMutex.unlock();
352 
353         }
354 
355         void dispose()
356         {
357             foreach(thread; threads)
358                 thread.dispose();
359             destroy(threads);
360             destroy(awaitSemaphore);
361             destroy(handlersMutex);
362         }
363     }
364 
365 }
366 else
367 {
368     
369     class HipWorkerPool
370     {
371         private HipWorkerThread thread;
372         protected void delegate()[] onAllTasksFinishHandlers;
373         private void delegate()[] finishHandlersOnMainThread;
374         size_t tasksCount = 0;
375         void addOnAllTasksFinished(void delegate() onAllFinished)
376         {
377             if(tasksCount == 0)
378                 onAllFinished();
379             else
380                 onAllTasksFinishHandlers~= onAllFinished;
381         }
382 
383         this(size_t poolSize)
384         {
385             thread = new HipWorkerThread(this, ulong.max);
386         }
387         void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true)
388         {
389             return (name)
390             {
391                 finishHandlersOnMainThread~= ()
392                 {
393                     onFinish(name); 
394                     if(finished)
395                         tasksCount--;
396                 };
397             };
398         }
399 
400         void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish)
401         {
402             return (name)
403             {
404                 if(onFinish) onFinish(name);
405                 version(WebAssembly){}
406                 else
407                     tasksCount--;
408             };
409         }
410         final void signalTaskFinish()
411         {
412             assert(tasksCount > 0, "Tried to signal task finish without tasks.");
413             tasksCount--;
414         }
415         final void await()
416         {
417             version(WebAssembly) assert(false, "Code using await does not work on WebAssembly.");
418         }
419         final void pollFinished()
420         {
421             if(finishHandlersOnMainThread.length)
422             {
423                 foreach(handler; finishHandlersOnMainThread)
424                     handler();
425                 finishHandlersOnMainThread.length = 0;
426             }
427             if(tasksCount == 0 && onAllTasksFinishHandlers.length)
428             {
429                 foreach(onAllFinish; onAllTasksFinishHandlers)
430                     onAllFinish();
431                 onAllTasksFinishHandlers.length = 0;
432             }
433         }
434         int getTasksCount()
435         {
436             return cast(int)tasksCount;
437         }
438 
439         final HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = true)
440         {
441             tasksCount++;
442             version(WebAssembly)
443                 assert(onTaskFinish is null, "Can't have an onTaskFinish on Wasm, implement it on a higher level using notfyOnFinish.");
444             thread.pushTask(name, task, notifyOnFinish(onTaskFinish));
445             return thread;
446         }
447         final void startWorking(){thread.startWorking();}
448         final void finish(){}
449         final bool isIdle(){return thread.isIdle;}
450         final void dispose(){}
451     }
452     class HipWorkerThread
453     {
454         struct WorkerTask
455         {
456             void delegate() task;
457             void delegate(string taskName) onTaskFinish;
458             string name;
459         }
460         WorkerTask[] tasks;
461 
462         this(HipWorkerPool pool, ulong id){}
463         final void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null)
464         {
465             tasks~= WorkerTask(task, onTaskFinish, name);
466         }
467 
468         final void startWorking()
469         {
470             import hip.util.time;
471             long timeNow = HipTime.getCurrentTimeAsMsLong();
472             size_t executed;
473             foreach(task; tasks)
474             {
475                 task.task();
476                 if(task.onTaskFinish)
477                     task.onTaskFinish(task.name);
478                 executed++;
479                 static if(HIP_ASSETMANAGER_PARTIAL_LOAD)
480                 {
481                     if(HipTime.getCurrentTimeAsMsLong() - timeNow > HIP_ASSETMANAGER_MAX_PROCESS_MS)
482                         break;
483                 }
484             }
485             tasks = tasks[executed..$];
486         }
487 
488         bool isIdle()
489         {
490             assert(false, "HipWorkerThread is not reliable to use isIdle since on WASM it returns immediately most functions since they are processed on background.");
491         }
492     }
493 }